K-Nearest Neighbors Time Series Regressor (KNN-TS) — analog forecasting from scratch#

A KNN time-series regressor predicts by finding similar past windows (“analogs”) and averaging what happened next.

Why it’s useful:

  • non-parametric baseline (no parametric trend/seasonality assumptions)

  • can model nonlinear patterns if they repeat in history

  • naturally provides a distribution of plausible next values via neighbor targets (confidence intuition)

Goals#

  • Implement KNeighborsTimeSeriesRegressor([...]) in NumPy (brute-force neighbors)

  • Show the key design choices: windowing, distance metric, normalization, weighting

  • Plot with Plotly:

    • the raw series

    • 1-step ahead forecasts with an empirical prediction band from neighbors

    • a “fan” view to build confidence intuition

1) Time series forecasting as supervised regression#

Given a univariate series \(y_0, y_1, \dots, y_{T-1}\), choose:

  • window length \(L\) (how many past points to compare)

  • forecast horizon \(h\) (predict how far ahead)

Define the feature window and target: $\( \mathbf{x}_t = [y_{t-L},\; y_{t-L+1},\; \dots,\; y_{t-1}]^\top \in \mathbb{R}^L, \qquad y^{(\mathrm{target})}_t = y_{t+h-1}. \)$

for \(t = L, L+1, \dots, T-h\).

This builds a supervised dataset \((\mathbf{x}_t, y^{(\mathrm{target})}_t)\).

2) KNN regression on windows#

Given a query window \(\mathbf{x}\), compute distances to all training windows \(\{\mathbf{x}_i\}_{i=1}^n\): $\( d_i = d(\mathbf{x}, \mathbf{x}_i). \)$

Let \(\mathcal{N}_k(\mathbf{x})\) be the indices of the \(k\) smallest distances.

Uniform-weight KNN regressor#

\[ \hat{y}(\mathbf{x}) = \frac{1}{k} \sum_{i\in\mathcal{N}_k(\mathbf{x})} y_i. \]

Distance-weighted KNN regressor#

A common choice is inverse-distance weights: $\( w_i = \frac{1}{d_i + \varepsilon}, \qquad \hat{y}(\mathbf{x}) = \frac{\sum_{i\in\mathcal{N}_k(\mathbf{x})} w_i y_i}{\sum_{i\in\mathcal{N}_k(\mathbf{x})} w_i}. \)$

Distance choice matters#

  • Euclidean on raw windows matches level + shape.

  • Z-normalized Euclidean (normalize each window to mean 0, std 1) matches shape more than level.

  • DTW (dynamic time warping) matches shapes that may be slightly time-warped.

This notebook implements Euclidean (fast, vectorized) and DTW (slower, but instructive).

import numpy as np
import pandas as pd

import plotly
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

from sklearn.metrics import mean_absolute_error, mean_squared_error

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

rng = np.random.default_rng(7)

print("numpy:", np.__version__)
print("pandas:", pd.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
plotly: 6.5.2

3) Synthetic series with repeating structure#

KNN works well when patterns repeat. We’ll simulate a series with trend + multiple seasonalities + noise.

n = 520
t = np.arange(n)
dates = pd.date_range("2020-01-01", periods=n, freq="D")

trend = 0.03 * t
season_30 = 2.2 * np.sin(2 * np.pi * t / 30)
season_7 = 0.9 * np.sin(2 * np.pi * t / 7)
noise = rng.normal(0.0, 0.9, size=n)

# A few shock events
shocks = np.zeros(n)
shock_idx = rng.choice(np.arange(40, n - 40), size=10, replace=False)
shocks[shock_idx] = rng.normal(0.0, 4.0, size=len(shock_idx))

y = 50 + trend + season_30 + season_7 + noise + shocks

fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="y"))
fig.update_layout(height=350, title="Synthetic series (trend + seasonality + noise)", xaxis_title="Time", yaxis_title="Value")
fig.show()
def make_supervised_windows(
    y: np.ndarray,
    *,
    window_length: int,
    horizon: int = 1,
    stride: int = 1,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Turn a 1D series into (X, y_target, t_target).

    X[i] is a length-L window, and y_target[i] is the value horizon steps after the window end.
    t_target[i] is the time index of y_target[i] in the original series.
    """
    y = np.asarray(y, dtype=float)
    L = int(window_length)
    h = int(horizon)
    s = int(stride)
    if L < 2:
        raise ValueError("window_length must be >= 2")
    if h < 1:
        raise ValueError("horizon must be >= 1")
    if s < 1:
        raise ValueError("stride must be >= 1")

    last_start = y.size - L - h
    if last_start < 0:
        raise ValueError("y is too short for the chosen window_length/horizon")

    starts = np.arange(0, last_start + 1, s)
    n_samples = len(starts)

    X = np.empty((n_samples, L), dtype=float)
    y_target = np.empty(n_samples, dtype=float)
    t_target = np.empty(n_samples, dtype=int)

    for i, start in enumerate(starts):
        end = start + L
        target_idx = end + h - 1
        X[i] = y[start:end]
        y_target[i] = y[target_idx]
        t_target[i] = target_idx

    return X, y_target, t_target


L = 60
horizon = 1

X, y_next, t_next = make_supervised_windows(y, window_length=L, horizon=horizon, stride=1)

h_test = 120
X_train, y_train = X[:-h_test], y_next[:-h_test]
X_test, y_test = X[-h_test:], y_next[-h_test:]
t_test = t_next[-h_test:]

print("X_train:", X_train.shape, "| X_test:", X_test.shape)
print("y_train:", y_train.shape, "| y_test:", y_test.shape)
X_train: (340, 60) | X_test: (120, 60)
y_train: (340,) | y_test: (120,)
def zscore_per_series(X: np.ndarray, *, eps: float = 1e-8) -> np.ndarray:
    """Z-normalize each sample (and each dim) across time."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        mu = X.mean(axis=1, keepdims=True)
        sd = X.std(axis=1, keepdims=True)
        return (X - mu) / (sd + eps)
    if X.ndim == 3:
        mu = X.mean(axis=2, keepdims=True)
        sd = X.std(axis=2, keepdims=True)
        return (X - mu) / (sd + eps)
    raise ValueError("X must be 2D or 3D")


def pairwise_euclidean_distances(X_query: np.ndarray, X_train: np.ndarray) -> np.ndarray:
    """Fast pairwise Euclidean distances for 2D arrays (n_query, p) vs (n_train, p)."""
    X_query = np.asarray(X_query, dtype=float)
    X_train = np.asarray(X_train, dtype=float)
    if X_query.ndim != 2 or X_train.ndim != 2:
        raise ValueError("X_query and X_train must be 2D arrays")
    if X_query.shape[1] != X_train.shape[1]:
        raise ValueError("X_query and X_train must have the same number of features")

    q_sq = np.sum(X_query**2, axis=1, keepdims=True)
    t_sq = np.sum(X_train**2, axis=1)
    d2 = q_sq + t_sq[None, :] - 2.0 * (X_query @ X_train.T)
    d2 = np.maximum(d2, 0.0)
    return np.sqrt(d2)


def dtw_distance_1d(a: np.ndarray, b: np.ndarray, *, window: int | None = None) -> float:
    """DTW distance between two 1D sequences using squared-cost DP (optionally banded)."""
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    n = a.size
    m = b.size

    if window is None:
        window = max(n, m)
    window = int(window)
    window = max(window, abs(n - m))

    D = np.full((n + 1, m + 1), np.inf)
    D[0, 0] = 0.0

    for i in range(1, n + 1):
        j_start = max(1, i - window)
        j_end = min(m, i + window)
        for j in range(j_start, j_end + 1):
            cost = (a[i - 1] - b[j - 1]) ** 2
            D[i, j] = cost + min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1])

    return float(np.sqrt(D[n, m]))


def pairwise_dtw_distances(X_query: np.ndarray, X_train: np.ndarray, *, window: int | None = None) -> np.ndarray:
    X_query = np.asarray(X_query, dtype=float)
    X_train = np.asarray(X_train, dtype=float)
    if X_query.ndim != 2 or X_train.ndim != 2:
        raise ValueError("X_query and X_train must be 2D arrays")
    if X_query.shape[1] != X_train.shape[1]:
        raise ValueError("DTW here assumes equal-length windows")

    out = np.empty((X_query.shape[0], X_train.shape[0]), dtype=float)
    for i in range(X_query.shape[0]):
        for j in range(X_train.shape[0]):
            out[i, j] = dtw_distance_1d(X_query[i], X_train[j], window=window)
    return out


def weighted_quantile_1d(values: np.ndarray, quantiles: np.ndarray, *, weights: np.ndarray) -> np.ndarray:
    """Weighted quantiles for 1D values. quantiles in [0,1]."""
    values = np.asarray(values, dtype=float)
    weights = np.asarray(weights, dtype=float)
    quantiles = np.asarray(quantiles, dtype=float)
    if values.ndim != 1 or weights.ndim != 1:
        raise ValueError("values and weights must be 1D")
    if values.size != weights.size:
        raise ValueError("values and weights must have the same length")
    if np.any(quantiles < 0) or np.any(quantiles > 1):
        raise ValueError("quantiles must be in [0,1]")

    sorter = np.argsort(values)
    v = values[sorter]
    w = weights[sorter]
    w_sum = np.sum(w)
    if w_sum <= 0:
        raise ValueError("sum of weights must be > 0")

    cdf = np.cumsum(w) / w_sum
    return np.interp(quantiles, cdf, v)
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
    """Accept (n,m) or (n,d,m) and return (n,d,m)."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        return X[:, None, :]
    if X.ndim == 3:
        return X
    raise ValueError("X must have shape (n,m) or (n,d,m)")


class KNeighborsTimeSeriesRegressor:
    """Brute-force KNN regressor for fixed-length time-series windows.

    Parameters
    ----------
    n_neighbors : int
        Number of neighbors (k).
    weights : {"uniform","distance"}
        Uniform average or inverse-distance weighting.
    metric : {"euclidean","dtw"}
        Distance metric between windows.
    normalize : {None,"zscore","global"}
        - None: use raw windows
        - "zscore": normalize each window to mean 0 and std 1 (shape matching)
        - "global": normalize using training mean/std (level + scale normalization)
    dtw_window : int | None
        Sakoe-Chiba band size for DTW (None -> unconstrained).
    eps : float
        Small constant for inverse-distance weights.
    """

    def __init__(
        self,
        *,
        n_neighbors: int = 10,
        weights: str = "uniform",
        metric: str = "euclidean",
        normalize: str | None = "zscore",
        dtw_window: int | None = None,
        eps: float = 1e-8,
    ):
        self.n_neighbors = int(n_neighbors)
        self.weights = str(weights)
        self.metric = str(metric)
        self.normalize = normalize
        self.dtw_window = dtw_window
        self.eps = float(eps)

    def fit(self, X, y):
        X3 = _as_3d_panel(X)
        y = np.asarray(y, dtype=float).reshape(-1)
        if X3.shape[0] != y.size:
            raise ValueError("X and y must have the same number of samples")
        if self.n_neighbors < 1:
            raise ValueError("n_neighbors must be >= 1")
        if self.weights not in {"uniform", "distance"}:
            raise ValueError("weights must be 'uniform' or 'distance'")
        if self.metric not in {"euclidean", "dtw"}:
            raise ValueError("metric must be 'euclidean' or 'dtw'")
        if self.normalize not in {None, "zscore", "global"}:
            raise ValueError("normalize must be None, 'zscore', or 'global'")

        self.n_train_ = int(X3.shape[0])
        self.n_dims_ = int(X3.shape[1])
        self.n_timepoints_ = int(X3.shape[2])

        self.X_train_raw_ = X3.copy()
        self.y_train_ = y.copy()

        if self.normalize == "global":
            mu = X3.mean(axis=(0, 2), keepdims=True)
            sd = X3.std(axis=(0, 2), keepdims=True)
            self._mu_ = mu
            self._sd_ = sd
            self.X_train_ = (X3 - mu) / (sd + self.eps)
        elif self.normalize == "zscore":
            self.X_train_ = zscore_per_series(X3, eps=self.eps)
        else:
            self.X_train_ = X3

        return self

    def _transform(self, X) -> np.ndarray:
        X3 = _as_3d_panel(X)
        if X3.shape[1] != self.n_dims_ or X3.shape[2] != self.n_timepoints_:
            raise ValueError(
                f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_}) for univariate)"
            )

        if self.normalize == "global":
            return (X3 - self._mu_) / (self._sd_ + self.eps)
        if self.normalize == "zscore":
            return zscore_per_series(X3, eps=self.eps)
        return X3

    def _pairwise_distances(self, X_query) -> np.ndarray:
        Q3 = self._transform(X_query)
        T3 = self.X_train_

        if self.metric == "euclidean":
            Q2 = Q3.reshape(Q3.shape[0], -1)
            T2 = T3.reshape(T3.shape[0], -1)
            return pairwise_euclidean_distances(Q2, T2)

        # DTW implementation here is univariate for simplicity.
        if self.n_dims_ != 1:
            raise ValueError("DTW metric currently supports only univariate windows")
        Q2 = Q3[:, 0, :]
        T2 = T3[:, 0, :]
        return pairwise_dtw_distances(Q2, T2, window=self.dtw_window)

    def kneighbors(self, X_query) -> tuple[np.ndarray, np.ndarray]:
        dists = self._pairwise_distances(X_query)
        k = min(self.n_neighbors, self.n_train_)

        # argpartition gives k smallest in arbitrary order
        idx = np.argpartition(dists, kth=k - 1, axis=1)[:, :k]
        row = np.arange(dists.shape[0])[:, None]
        dist_k = dists[row, idx]

        # sort neighbors by distance
        order = np.argsort(dist_k, axis=1)
        idx_sorted = idx[row, order]
        dist_sorted = dist_k[row, order]
        return dist_sorted, idx_sorted

    def predict(self, X_query) -> np.ndarray:
        dist, idx = self.kneighbors(X_query)
        yk = self.y_train_[idx]

        if self.weights == "uniform":
            return yk.mean(axis=1)

        w = 1.0 / (dist + self.eps)
        w = w / np.sum(w, axis=1, keepdims=True)
        return np.sum(w * yk, axis=1)

    def neighbor_targets(self, X_query) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Return (distances, indices, neighbor y-values) for each query window."""
        dist, idx = self.kneighbors(X_query)
        return dist, idx, self.y_train_[idx]
model = KNeighborsTimeSeriesRegressor(
    n_neighbors=25,
    weights="distance",
    metric="euclidean",
    normalize="zscore",
)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)

# Empirical "prediction band" from the neighbor targets.
dist, idx, y_nei = model.neighbor_targets(X_test)

if model.weights == "distance":
    w = 1.0 / (dist + model.eps)
    q = weighted_quantile_1d
    qs = np.array([0.05, 0.50, 0.95])
    q_mat = np.vstack([q(y_nei[i], qs, weights=w[i]) for i in range(y_nei.shape[0])])
    q05, q50, q95 = q_mat[:, 0], q_mat[:, 1], q_mat[:, 2]
else:
    q05, q50, q95 = np.quantile(y_nei, [0.05, 0.50, 0.95], axis=1)

mae = mean_absolute_error(y_test, y_pred)
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
print(f"MAE:  {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
MAE:  6.285
RMSE: 6.434
test_dates = dates[t_test]

fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="y", line=dict(color="#444")))

# 90% band from neighbor targets
fig.add_trace(
    go.Scatter(x=test_dates, y=q95, mode="lines", line=dict(width=0), showlegend=False)
)
fig.add_trace(
    go.Scatter(
        x=test_dates,
        y=q05,
        mode="lines",
        line=dict(width=0),
        fill="tonexty",
        fillcolor="rgba(31,119,180,0.20)",
        name="90% neighbor band",
    )
)

fig.add_trace(go.Scatter(x=test_dates, y=y_test, mode="lines", name="Actual (test)", line=dict(color="#000")))
fig.add_trace(go.Scatter(x=test_dates, y=y_pred, mode="lines", name="KNN forecast", line=dict(color="#1f77b4")))

fig.update_layout(
    height=450,
    title="1-step ahead forecast via KNN windows (with neighbor-based band)",
    xaxis_title="Time",
    yaxis_title="Value",
)
fig.show()

4) Confidence intuition: the neighbor target distribution#

KNN doesn’t give a parametric confidence interval, but it naturally gives a set of neighbor targets.

If the nearest neighbors all lead to similar next-step values, the forecast is “confident”. If neighbor targets vary widely, you should expect more uncertainty.

i = 35  # pick a test point
xq = X_test[i : i + 1]
y_true = float(y_test[i])
y_hat = float(y_pred[i])

dist1, idx1, yk = model.neighbor_targets(xq)
dist1 = dist1[0]
idx1 = idx1[0]
yk = yk[0]

fig = make_subplots(rows=1, cols=2, column_widths=[0.55, 0.45], subplot_titles=("Query vs nearest windows", "Neighbor targets"))

# Left: overlay neighbor windows (aligned by relative time within the window)
rel_t = np.arange(L)
for j in range(min(12, len(idx1))):
    fig.add_trace(
        go.Scatter(
            x=rel_t,
            y=X_train[idx1[j]],
            mode="lines",
            line=dict(color="rgba(31,119,180,0.18)", width=1),
            showlegend=False,
        ),
        row=1,
        col=1,
    )

fig.add_trace(
    go.Scatter(x=rel_t, y=xq[0], mode="lines", name="Query window", line=dict(color="#000", width=3)),
    row=1,
    col=1,
)

# Right: histogram of neighbor targets
fig.add_trace(go.Histogram(x=yk, nbinsx=18, name="Neighbor y", marker=dict(color="rgba(31,119,180,0.65)")), row=1, col=2)

for xval, name, color in [
    (y_true, "Actual", "#000"),
    (y_hat, "Pred", "#1f77b4"),
    (float(np.median(yk)), "Median(nei)", "#ff7f0e"),
]:
    fig.add_vline(x=xval, line_width=2, line_dash="dash", line_color=color, row=1, col=2)

fig.update_xaxes(title_text="Window time index", row=1, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_xaxes(title_text="Next value", row=1, col=2)
fig.update_yaxes(title_text="Count", row=1, col=2)

fig.update_layout(height=420, title="KNN confidence intuition via neighbor dispersion")
fig.show()

5) Forecasting beyond the observed data (recursive)#

To forecast multiple steps ahead, one simple approach is recursive forecasting:

  1. predict the next point from the last window,

  2. append it to the history,

  3. repeat.

We’ll also propagate a simple fan by taking quantiles of the neighbor targets at each step (heuristic, but visually useful).

model_full = KNeighborsTimeSeriesRegressor(
    n_neighbors=25,
    weights="distance",
    metric="euclidean",
    normalize="zscore",
)

# Fit on all available windows.
X_all, y_all, t_all = make_supervised_windows(y, window_length=L, horizon=1, stride=1)
model_full.fit(X_all, y_all)


def recursive_forecast_with_band(
    model: KNeighborsTimeSeriesRegressor,
    y_hist: np.ndarray,
    *,
    window_length: int,
    steps: int,
    qs=(0.05, 0.50, 0.95),
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    y_hist = np.asarray(y_hist, dtype=float)
    L = int(window_length)
    out = np.zeros((steps, 3), dtype=float)

    buf = y_hist.tolist()
    for s in range(steps):
        xq = np.asarray(buf[-L:], dtype=float)[None, :]
        dist, _, yk = model.neighbor_targets(xq)
        dist = dist[0]
        yk = yk[0]

        if model.weights == "distance":
            w = 1.0 / (dist + model.eps)
            out[s] = weighted_quantile_1d(yk, np.asarray(qs), weights=w)
        else:
            out[s] = np.quantile(yk, qs)

        # Use the median as the point to recurse on (robust to outliers)
        buf.append(float(out[s, 1]))

    q05, q50, q95 = out[:, 0], out[:, 1], out[:, 2]
    return q50, q05, q95


n_future = 60
fc_med, fc_lo, fc_hi = recursive_forecast_with_band(model_full, y, window_length=L, steps=n_future)
future_dates = pd.date_range(dates[-1] + pd.Timedelta(days=1), periods=n_future, freq="D")

fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="History", line=dict(color="#444")))

fig.add_trace(go.Scatter(x=future_dates, y=fc_hi, mode="lines", line=dict(width=0), showlegend=False))
fig.add_trace(
    go.Scatter(
        x=future_dates,
        y=fc_lo,
        mode="lines",
        line=dict(width=0),
        fill="tonexty",
        fillcolor="rgba(31,119,180,0.20)",
        name="90% band (heuristic)",
    )
)
fig.add_trace(go.Scatter(x=future_dates, y=fc_med, mode="lines", name="Forecast (median)", line=dict(color="#1f77b4", width=3)))

fig.update_layout(height=450, title="Recursive KNN forecast with fan-style band", xaxis_title="Time", yaxis_title="Value")
fig.show()

Practical notes + typical use cases#

When KNN-TS is a good fit#

  • Analog forecasting: the future behaves like the past when similar patterns occur

  • Small data: you want a strong baseline without heavy training

  • Nonlinear but repetitive dynamics

  • You want an easy-to-explain model: “these past windows looked most like today”

Pitfalls#

  • Scaling/normalization is crucial: distances are the model.

  • Large windows increase dimensionality → neighbors become less meaningful (curse of dimensionality).

  • Brute-force KNN is \(O(n\,L)\) per query for Euclidean (and slower for DTW). For large \(n\), you need approximations / indexing.

  • Non-stationarity (level shifts, changing seasonality) can break “similarity” unless you normalize or add features.

Extensions#

  • Use DTW for more flexible shape matching (slower).

  • Add calendar / exogenous features by concatenating them to the window vector.

  • Predict multi-step horizons directly by changing the target to \(y_{t+h}\).